HEX
Server: LiteSpeed
System: Linux eticaretsrv4.isimtescil.net 3.10.0-962.3.2.lve1.5.26.7.el7.x86_64 #1 SMP Wed Oct 2 07:53:12 EDT 2019 x86_64
User: sioberen (1086)
PHP: 7.3.33
Disabled: NONE
Upload Files
File: //opt/alt/python37/lib/python3.7/site-packages/cllimits/clquota_lib.py
# -*- coding: utf-8 -*-

# clquota.py - module for interfacing with cl-quota utility for get/set user's quotas
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import absolute_import
import csv
from cllimits.lib import exec_utility
from clcommon.clexception import FormattedException
from clcommon.utils import run_command, ExternalProgramFailed
from cllimits.lib.utils import is_quota_supported, is_quota_active


class ClQuotaException(FormattedException):
    pass


class ClQuota:
    _CL_QUOTA_PATH = '/usr/bin/cl-quota'
    _REPQUOTA_PATH = '/usr/sbin/repquota'

    def __init__(self):
        # inodes limits data
        # uid --> (soft_limit, hard_limit)
        self._inodes_limits = None
        # Error flag
        self._is_clquota_error = False
        # Check if quota is supported
        self._is_clquota_present = is_quota_supported(self._CL_QUOTA_PATH, self._REPQUOTA_PATH)
        # Check if quota is activated
        self._is_clquota_activated = is_quota_active(self._CL_QUOTA_PATH, self._REPQUOTA_PATH)

    def is_clquota_present(self):
        """
        Get quota presence flag
        :return: True/False - quotas present/not present
        """
        return self._is_clquota_present

    def is_clquota_activated(self):
        """
        Get quota activated flag
        :return: True/False - quotas present/not present
        """
        return self._is_clquota_activated

    def get_inodes_limits_by_uid(self, user_id):
        """
        Retrive inodes limits by uid
        :param user_id: Supplied uid
        :return: cortege (soft_limit, hard_limit). (None, None) if governor not present or error
        """
        # Load inodes data if need
        self._load_info()
        uid = str(user_id)
        if uid in self._inodes_limits:
            return self._inodes_limits[uid]
        return self._inodes_limits['0']

    def set_user_inodes_limits(self, uid, limits, force=False):
        """
        Set inodes limits for user uid
        :param: int uid: user id
        :param: list limits: new inodes limits
        :param: bool force: save limits if even they are equal to defaults
        :return: None
        """
        self._load_info()
        if not isinstance(uid, int) or uid < 0:
            exc_message = {'message': "User id '%(uid)s' isn't a positive integer",
                           'context': {'uid': uid}}
            raise ClQuotaException(exc_message)
        if limits == "unlimited":
            limits = ["unlimited"] * 2
        elif limits == "default":
            limits = ["default"] * 2
        else:
            if type(limits) in [tuple, list]:
                pass
            elif len(limits.split(',')) == 2:
                limits = limits.split(',')
            else:
                exc_message = {'message': "Limits %(limits)s aren't acceptable.",
                               'context': {'limits': limits}}
                raise ClQuotaException(exc_message)

            for l in limits:
                try:
                    if l != "default":
                        l = int(l)
                        if l < 0:
                            raise ValueError()
                except ValueError:
                    exc_message = {'message': "Limit value '%(limit)s' isn't a positive integer or string %(default)s",
                                   'context': {'limit': l, 'default': 'default'}}
                    raise ClQuotaException(exc_message)
        cmd = [self._CL_QUOTA_PATH,
               '--user-id=%d' % uid,
               '--soft=%s' % limits[0],
               '--hard=%s' % limits[1]]
        if force:
            cmd.append("--force")
        try:
            run_command(cmd)
        except ExternalProgramFailed as e:
            raise ClQuotaException(str(e))

    def set_user_inodes_limits_unlimited(self, uid):
        """
        Set unlimited inodes limits for user uid
        :param: int uid: user id
        :return: None
        """
        self.set_user_inodes_limits(uid, "unlimited")

    def reset_user_inodes_limits(self, uid):
        """
        Set default inodes limits for user uid
        :param: int uid: user id
        :return: None
        """
        self.set_user_inodes_limits(uid, "default")

    def _load_info(self):
        """
        Loads users info from cl-quota
        :return: None
        """
        # Exit if cl-quota data already loaded
        if self._inodes_limits is not None:
            return
        disabled_exc_message = {'message': "%(util)s is disabled",
                                'context': {'util': 'Quota'}}
        # Exit if cl-quota not present or cl-quota error
        if not self._is_clquota_present or not self._is_clquota_activated or self._is_clquota_error:
            raise ClQuotaException(disabled_exc_message)
        # Get all cl-quota limits and parse them to self._inodes_limits
        ret_code, s_quota_limits = exec_utility(self._CL_QUOTA_PATH, ['--csv'])
        if ret_code != 0:
            # Error
            self._is_clquota_error = True
            raise ClQuotaException(disabled_exc_message)
        reader = csv.reader(s_quota_limits.split('\n'), delimiter=',')
        self._inodes_limits = dict()
        for row in reader:
            if 'ERROR' in row:
                # Error
                self._is_clquota_error = True
                # Remove data dictionary
                self._inodes_limits = None
                raise ClQuotaException(disabled_exc_message)
            # Pass header line
            if 'id' in row:
                continue
            # Add limits to data dictionary
            self._inodes_limits[row[0]] = (int(row[2]), int(row[3]))
        if '0' not in self._inodes_limits:
            # Error
            self._is_clquota_error = True
            # Remove data dictionary
            self._inodes_limits = None
            exc_message = {'message': "There is no %(what)s found in %(where)s",
                           'context': {'what': 'default settings', 'where': '%s output' % self._CL_QUOTA_PATH}}
            raise ClQuotaException(exc_message)

    def reset_inodes_limits(self):
        """
        Reset inodes limits for all users to package limits
        :return:
        """
        if not self._is_clquota_present or not self._is_clquota_activated:
            return
        exec_utility(self._CL_QUOTA_PATH, ['--sync'])